[AWS IoT Greengrass V2] ストリームマネージャーを使用してWebカメラの画像を毎秒2フレームでS3に送信してみました
1 はじめに
IoT事業部の平内(SIN)です。
AWS IoT Greengrass V2では、AWSから提供されるいくつかの事前構築済みコンポーネントがありますが、その中のストリームマネージャー (aws.greengrass.StreamManager) を使用すると、ストリームのための共通インタフェースが利用可能になります。
以前に、このストリームマネージャーを使用して、Kinesis Data Streamsへの送信を試してみました。
今回は、同じくストリームマネージャーを使用していますが、宛先をS3バケットとし、デバイスに接続されたWebカメラの画像をリアルタイムに送信してみました。
2 構成
構成は、以下の通りです。
Greengrassコアでは、2つのカスタムコンポーネントが動作しています。
- 画像取得
- 画像送信
1つ目(画像取得)は、Webカメラの画像をOpenCVで取得し、特定のフォルダに、タイムスタンプ名で、どんどん保存しています。そして2つ目(画像送信)のコンポーネントは、フォルダに置かれた画像を、順次ストリームマネージャーに送信しています。送信が完了した画像は、その時点で削除されます。
後は、ストリームマネージャーが、指定されたS3バケットへの送信をマネージドで行ってくれるという按配です。
3 動作のようす
- Webカメラの前には、トマト羊とアヒルが置かれています。
- 1秒に2フレーム、次々画像がS3バケットに溜まっていきます。
4 画像取得
画像取得を担当しているコンポーネントのコードです。
OpenCVでWebカメラの画像を取得し、2FPSでフォルダ(/tmp/s3_stream)に保存しているだけです。
web_cam.py
import cv2 import datetime import os # Webカメラ DEVICE_ID = 0 WIDTH = 640 HEIGHT = 480 FPS = 20 folder_name = "/tmp/s3_stream" os.makedirs(folder_name, exist_ok=True) def main(): cap = cv2.VideoCapture(DEVICE_ID) # フォーマット・解像度・FPSの設定 cap.set(cv2.CAP_PROP_FRAME_WIDTH, WIDTH) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, HEIGHT) cap.set(cv2.CAP_PROP_FPS, FPS) # フォーマット・解像度・FPSの取得 width = cap.get(cv2.CAP_PROP_FRAME_WIDTH) height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT) fps = cap.get(cv2.CAP_PROP_FPS) print("fps:{} width:{} height:{}".format(fps, width, height)) counter = 0 while True: # カメラ画像取得 _, frame = cap.read() if(frame is None): continue # 10フレームに1回保存 if(counter%10==0): dt = datetime.datetime.now() file_name = dt.strftime('%Y_%m_%d_%H.%M.%S.jpg') print(file_name) cv2.imwrite("{}/{}".format(folder_name, file_name), frame) counter += 1 if __name__ == '__main__': main()
そして、レシピです。
--- RecipeFormatVersion: "2020-01-25" ComponentName: "com.example.WebCam" ComponentVersion: "1.0.0" ComponentType: "aws.greengrass.generic" Manifests: - Platform: os: linux Lifecycle: Run: | export LD_PRELOAD=/usr/lib/arm-linux-gnueabihf/libatomic.so.1 python3 -u {artifacts:path}/web_cam.py Artifacts: - URI: s3://gg-artifacts-2021-08-11/artifacts/com.example.WebCam/1.0.0/web_cam.py
5 画像送信
画像送信を担当しているコンポーネントのコードです。
ストリームマネージャーを使用して、send_file()で指定されたファイルを送信するクラスが中心となっています。
参考:https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python/blob/main/samples/stream_manager_s3.py
import os import time import glob from stream_manager import ( ExportDefinition, MessageStreamDefinition, ReadMessagesOptions, ResourceNotFoundException, S3ExportTaskDefinition, S3ExportTaskExecutorConfig, Status, StatusConfig, StatusLevel, StatusMessage, StrategyOnFull, StreamManagerClient, StreamManagerException, ) from stream_manager.util import Util class S3Stream: def __init__(self, stream_name, bucket_name, folder_name): self.stream_name = stream_name self.status_stream_name = "status" + self.stream_name self.bucket_name = bucket_name self.folder_name = folder_name os.makedirs(self.folder_name, exist_ok=True) self.client = StreamManagerClient() # ストリームが既に存在する場合は、一旦削除する try: self.client.delete_message_stream(stream_name=self.status_stream_name) except ResourceNotFoundException: pass try: self.client.delete_message_stream(stream_name=self.stream_name) except ResourceNotFoundException: pass # ストリーム作成(S3) exports = ExportDefinition( s3_task_executor=[ S3ExportTaskExecutorConfig( identifier="S3TaskExecutor" + stream_name, status_config=StatusConfig( status_level=StatusLevel.INFO, status_stream_name=self.status_stream_name, ), ) ] ) self.client.create_message_stream( MessageStreamDefinition( name=self.status_stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData) ) self.client.create_message_stream( MessageStreamDefinition( name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports ) ) # folder_name/file_name を bucket_name/key_name に送信する # 送信完了後、folder_name/file_nameは、削除される def send_file(self, file_name, key_name): print("? send_file: {}/{} to s3://{}/{}".format(self.folder_name, file_name, self.bucket_name, key_name)) # S3タスク定義を追加し、シーケンス番号を出力 input_url = "file:{}/{}".format(self.folder_name, file_name) s3_export_task_definition = S3ExportTaskDefinition(input_url=input_url, bucket=self.bucket_name, key=key_name) ret = self.client.append_message(self.stream_name, Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) print("sequence number {}".format(ret)) stop_checking = False next_seq = 0 while not stop_checking: print("next_seq:{}".format(next_seq)) try: messages_list = self.client.read_messages( self.status_stream_name, ReadMessagesOptions( desired_start_sequence_number=next_seq, min_message_count=1, read_timeout_millis=1000 ), ) for message in messages_list: status_message = Util.deserialize_json_bytes_to_obj(message.payload, StatusMessage) if status_message.status == Status.Success: print("Status.Success") # 送信完了したのファイルを削除する file_path = "{}/{}".format(self.folder_name, file_name) os.remove(file_path) print("{} removed.".format(file_path)) stop_checking = True elif status_message.status == Status.InProgress: print("Status.InProgress.") next_seq = message.sequence_number + 1 elif status_message.status == Status.Failure or status_message.status == Status.Canceled: print("Status.Failure or Canceled") stop_checking = True if not stop_checking: print("not stop_checking. sleep(5)") time.sleep(5) except StreamManagerException: print("Exception while running. sleep(5)") time.sleep(5) print("while out.") def main(): stream_name = "SomeStream" bucket_name = "stream-manager-sample-2021-09-03" folder_name = "/tmp/s3_stream" s3_stream = S3Stream(stream_name, bucket_name, folder_name) # folder_nameの下に、置かれたファイルを、順次S3に送信する while(True): files = glob.glob("{}/*".format(folder_name)) for file in files: file_name = os.path.basename(file) key_name = file_name.replace("_","/") s3_stream.send_file(file_name, key_name) time.sleep(1) main()
レシピです。ストームマネージャーのクライアントSDKも一所に送っていますが、以前、Kinesis Data Streamsで送信した時と同じ要領です。
参考:[AWS IoT Greengrass V2] ストリームマネージャーを使用してコンポーネントからKinesis Data Streamsへデータを送ってみました
--- RecipeFormatVersion: "2020-01-25" ComponentName: "com.example.S3Sample" ComponentVersion: "1.0.0" ComponentType: "aws.greengrass.generic" ComponentDependencies: aws.greengrass.StreamManager: VersionRequirement: "^2.0.0" Manifests: - Platform: os: linux Lifecycle: Install: pip3 install --user -r {artifacts:path}/requirements.txt Run: | export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk python3 -u {artifacts:path}/s3_sample.py Artifacts: - URI: s3://gg-artifacts-2021-08-11/artifacts/com.example.S3Sample/1.0.0/stream_manager_sdk.zip Unarchive: ZIP - URI: s3://gg-artifacts-2021-08-11/artifacts/com.example.S3Sample/1.0.0/s3_sample.py - URI: s3://gg-artifacts-2021-08-11/artifacts/com.example.S3Sample/1.0.0/requirements.txt
6 デプロイ
2つのコンポーネントをデプロイしています。
デバイス上では、以下のように見えます。
$ sudo /greengrass/v2/bin/greengrass-cli component list ・・・略・・・ Component Name: aws.greengrass.StreamManager Version: 2.0.12 State: RUNNING Configuration: {"JVM_ARGS":"","LOG_LEVEL":"INFO","port":"8088","STREAM_MANAGER_AUTHENTICATE_CLIENT":"true","STREAM_MANAGER_ENABLE_LOCK_ON_METADATA_STORE":"false","STREAM_MANAGER_EXPORTER_MAX_BANDWIDTH":"2147483647","STREAM_MANAGER_EXPORTER_S3_DESTINATION_MULTIPART_UPLOAD_MIN_PART_SIZE_BYTES":"5242880","STREAM_MANAGER_EXPORTER_THREAD_POOL_SIZE":"5","STREAM_MANAGER_SERVER_PORT":"8088","STREAM_MANAGER_STORE_ROOT_DIR":"."} ・・・略・・・ Component Name: com.example.WebCam Version: 1.0.0 State: RUNNING Configuration: {} ・・・略・・・ Component Name: com.example.S3Sample Version: 1.2.3 State: RUNNING Configuration: {} ・・・略・・・
また、下図は、LocalDebugConsoleで、見ている様子です。この画面から、コンポーネントの状態確認や、起動停止が簡単に行えるので、ほんと捗ります。
参考:[AWS IoT Greengrass V2] ローカルデバッグコンソール(aws.greengrass.LocalDebugConsole)を使用してみました
7 ポリシー
ストリームマネージャー経由でS3へ保存する場合も、バケットに対するPutObject権限が必要です。
ここでは、下記のポリシーを GreengrassV2TokenExchangeRole ロールに追加しています。
stream-manager-sample-2021-09-03
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject" ], "Resource": "arn:aws:s3:::stream-manager-sample-2021-09-03/*" } ] }
8 デバイス上のアクセス権
Greengrassで動作するコンポーネントは、デフォルトでユーザ(ggc_user)、グループ(ggc_user)となっています。 コンポーネントからデバイス上のリソースを使用する場合、Greengrassが動作するユーザーで利用可能になるように構成しておくことが必要です。
今回設定したのは、以下の2点です。
(1) 画像を保管するフォルダのオーナーをggc_userとする
$ ls -la /tmp/s3_stream/ total 764 drwxr-xr-x 2 ggc_user ggc_user 4096 Sep 14 02:26 .
(2) videoのグループにggc_userを追加
カメラデバイスへのアクセスのため
$ cat /etc/group | grep video video:x:44:pi $ sudo usermod -a -G video ggc_user $ cat /etc/group | grep video video:x:44:pi,ggc_user
9 最後に
今回は、「Raspberry Pi でWebカメラの画像をS3へ送信」というような、よく見るパターンのサンプルをGreengrassでやってみました。
ストリームマネージャーを使用することで、ネットワーク切断時の対応、送信スピード(書き込み頻度)、他のストリームとの優先などの実装が必要なくなります。
もう少し、要件を複雑にしたり、環境をいじめると、この辺の効果を実感できるかもしれません。
10 参考リンク
[AWS IoT Greengrass V2] RaspberryPIにインストールしてみました
[AWS IoT Greengrass V2] RaspberryPIでコンポーネントを作成してみました
[AWS IoT Greengrass V2] クラウド側から複数のコアデバイスにコンポーネントをデプロイしてみました
[AWS IoT Greengrass V2] クラウド側からコンポーネントを削除してみました
[AWS IoT Greengrass V2] ローカルデバッグコンソール(aws.greengrass.LocalDebugConsole)を使用してみました
[AWS IoT Greengrass V2] Lambda関数(コンポーネント)をデプロイしてみました
[AWS IoT Greengrass V2] コンポーネントからIoT CoreのメッセージブローカーにPublish/Subscribeしてみました
[AWS IoT Greengrass V2] コンポーネントからシークレットマネージャにアクセスしてみました
[AWS IoT Greengrass V2] コンポーネントでコアデバイス間のPublish/Subscribeを試してみました
[AWS IoT Greengrass V2] ログマネージャでコンポーネントのログをCloudWatch Logsに送ってみました
[AWS IoT Greengrass V2] トークン交換サービスでコンポーネントからDynamoDBにアクセスしてみました
[AWS IoT Greengrass V2] ストリームマネージャーを使用してコンポーネントからKinesis Data Streamsへデータを送ってみました
[AWS IoT Greengrass V2] プロセス間通信 (IPC) を使用してコンポーネントの設定値を使用してみました